-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2205][SPARK-7871][SQL]Advoid redundancy exchange #6682
Conversation
Can one of the admins verify this patch? |
@jeanlyn Thanks for working on it. However, I am not sure this is the change we want for long term. To fundamentally address this issue, we need to consider what attributes are equivalent (actually I am planning to do it by myself). I am not sure what |
@yhuai ,Thanks for comment.In the current implementation of table a(key string,value string)
table b(key string,value string)
table c(key string,value string)
table d(key string,value string)
table e(key string,value string)
select a.value,b.value,c.value,d.value,e.value from
a join b
on a.key = b.key
join c
on a.key = c.key
join d
on b.key = d.key
join e
on c.key = e.key we got
But actually
This will greatly improve the efficiency,especially for the outer join. We had some real world cases of multiway full outer join with the same key,it produce a lot of null key(causing data skew,while hive doesn't) with redundancy shuffle and ran OOM finally. |
Thanks for the example. For your query, if you write it as
You will find those unnecessary exchange operators are gone (I am just pointing out a workaround. I am not questioning the issue at all.). I mean we really need have the notion that all of For the case of full outer joins, I guess you meant all records from all tables with a null key are shuffled to the same reducer, right? btw, what is the plan generated by Hive? |
@yhuai .Yes,the full outer join cases shuffled the null key to the same reducer in spark-sql ,and the hive plan generated like: explain select a.value,b.value,c.value,d.value from
a full outer join b
on a.key = b.key
full outer join c
on a.key = c.key
full outer join d
on a.key = d.key
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: a
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Reduce Output Operator
key expressions: key (type: string)
sort order: +
Map-reduce partition columns: key (type: string)
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
value expressions: value (type: string)
TableScan
alias: b
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Reduce Output Operator
key expressions: key (type: string)
sort order: +
Map-reduce partition columns: key (type: string)
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
value expressions: value (type: string)
TableScan
alias: c
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Reduce Output Operator
key expressions: key (type: string)
sort order: +
Map-reduce partition columns: key (type: string)
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
value expressions: value (type: string)
TableScan
alias: d
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Reduce Output Operator
key expressions: key (type: string)
sort order: +
Map-reduce partition columns: key (type: string)
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
value expressions: value (type: string)
Reduce Operator Tree:
Join Operator
condition map:
Outer Join 0 to 1
Outer Join 0 to 2
Outer Join 0 to 3
keys:
0 key (type: string)
1 key (type: string)
2 key (type: string)
3 key (type: string)
outputColumnNames: _col1, _col6, _col11, _col16
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Select Operator
expressions: _col1 (type: string), _col6 (type: string), _col11 (type: string), _col16 (type: string)
outputColumnNames: _col0, _col1, _col2, _col3
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink @chenghao-intel has a solution in #6413 |
When only use the output partitioning of
BinaryNode
will probably add unnecessaryExchange
like multiway join.This PR add
meetPartitions
to SparkPlan advoid redundancy exchanges by use to save the partitioning of the node and the child,and will be reset to node partitioning when need shuffle.